package com.lianda.state;

import com.lianda.connectors.utils.GsonUtil;
import com.lianda.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class BroadcastStateTestUtil {

    public static final String broker_list = "localhost:9092";
    public static final String topic = "user_info";  // kafka topic，Flink 程序中需要和这个统一

    //写数据到kafka
    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("group.id", "user-info-test");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        String user1 = "{\"userID\":\"user_3\",\"eventTime\":\"2019-08-17 12:19:47\",\"eventType\":\"browse\",\"productID\":1}";
        ProducerRecord record1 = new ProducerRecord<String, String>
                (topic, null, null, user1);
        producer.send(record1);
        System.out.println("发送数据: " + user1);

        String user2 = "{\"userID\":\"user_2\",\"eventTime\":\"2019-08-17 12:19:48\",\"eventType\":\"click\",\"productID\":1}";
        ProducerRecord record2 = new ProducerRecord<String, String>
                (topic, null, null, user2);
        producer.send(record2);
        System.out.println("发送数据: " + user2);

        producer.flush();
    }

    public static void main(String[] args) throws Exception {
        int i = 0;
        while (i < 3) {
            writeToKafka();
            Thread.sleep(10000);
        }
    }
}
